Kafka/long lived producer#83
Conversation
08ef23e to
da3334f
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #83 +/- ##
==========================================
+ Coverage 83.69% 83.73% +0.03%
==========================================
Files 13 13
Lines 1325 1328 +3
==========================================
+ Hits 1109 1112 +3
Misses 216 216
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
lubomir
left a comment
There was a problem hiding this comment.
I like the general direction. However, I think we should wait with further changes until we confirm in stage that the code is indeed working so that we don't have to chase a moving target.
| for msg in msgs: | ||
| event = msg.get("event", "event") | ||
| topic = "%s%s" % (conf.messaging_topic_prefix, event) | ||
| producer.send(topic, msg) |
There was a problem hiding this comment.
Does this actually ever raise any exceptions? It returns a Future immediately, so I would not expecte any network issues to appear as exceptions. Adding back the flush() might help.
There was a problem hiding this comment.
flush() was dropped to let Kafka handle batching. CTS sends few messages, and linger_ms=0 means they're sent almost immediately anyway.
But you're right, without flush(), send() just returns a Future and delivery errors are never raised. The error recovery code would never trigger. I'll add it back.
There was a problem hiding this comment.
The built-in batching is a good point though. I didn't think about that. Maybe it's the error handling that should be removed?
There was a problem hiding this comment.
If the retry logic is also handled by Kafka itself, I guess the flush() and error handling can be removed. We would just be missing the ability to log delivery statuses on CTS's side.
| _kafka_producer.close() | ||
| except Exception: | ||
| pass | ||
| _kafka_producer = None |
There was a problem hiding this comment.
This code seems rather fragile. There's a helper to create the producer, but here we still need to touch the global variable directly. Does KafkaProducer have some reconnection logic we could use instead?
There was a problem hiding this comment.
Roger, relying on Kafka's built-in reconnection and retry API instead.
There was a problem hiding this comment.
Just realized that we will need to keep _retry_with_backoff for UBM compatibility.
But not a blocker.
Addresses sections 3 and 4 commented on PR #82
3.- There is batching of messages. The producer does this automatically (partially).
4.- The producer is created for each batch of messages. Kafka seems to prefer a long lived producer that is reused.
Comment ref: #82 (review)
Stacked on: #82